Spring Boot集成kafka完整版

您所在的位置:网站首页 kafka segement Spring Boot集成kafka完整版

Spring Boot集成kafka完整版

2023-04-01 11:43| 来源: 网络整理| 查看: 265

pom.xml添加maven依赖 org.springframework.boot spring-boot-starter-parent 2.0.2.RELEASE org.springframework.kafka spring-kafka

spring boot会自动配置kafka,接下来只要配置yml属性文件和主题名配置。

application.yml配置kafka spring: kafka: bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 application.yml配置主题和消费者组 kafka: topic: group-id: topicGroupId topic-name: - topic1 - topic2 - topic3

新建KafkaTopicProperties

@ConfigurationProperties("kafka.topic") public class KafkaTopicProperties implements Serializable { private String groupId; private String[] topicName; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String[] getTopicName() { return topicName; } public void setTopicName(String[] topicName) { this.topicName = topicName; }

添加KafkaTopicConfiguration

@Configuration @EnableConfigurationProperties(KafkaTopicProperties.class) public class KafkaTopicConfiguration { private final KafkaTopicProperties properties; public KafkaTopicConfiguration(KafkaTopicProperties properties) { this.properties = properties; } @Bean public String[] kafkaTopicName() { return properties.getTopicName(); } @Bean public String topicGroupId() { return properties.getGroupId(); } } 添加自己的service @Service public class IndicatorService { private Logger LOG = LoggerFactory.getLogger(IndicatorService.class); private final KafkaTemplate kafkaTemplate; /** * 注入KafkaTemplate * @param kafkaTemplate kafka模版类 */ @Autowired public IndicatorService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}") public void processMessage(ConsumerRecord record) { LOG.info("kafka processMessage start"); LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value()); // do something ... LOG.info("kafka processMessage end"); } public void sendMessage(String topic, String data) { LOG.info("kafka sendMessage start"); ListenableFuture future = kafkaTemplate.send(topic, data); future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data); } @Override public void onSuccess(SendResult result) { LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data); } }); LOG.info("kafka sendMessage end"); } }

至此就可以跑起来了,有什么不明白的可以留言。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3